/*
 Navicat Premium Data Transfer

 Source Server         : 120.92.51.62-开发机-postgres
 Source Server Type    : PostgreSQL
 Source Server Version : 110007
 Source Host           : 120.92.51.62:5432
 Source Catalog        : kuangshan-server
 Source Schema         : synch

 Target Server Type    : PostgreSQL
 Target Server Version : 110007
 File Encoding         : 65001

 Date: 04/03/2021 15:07:54
*/


-- ----------------------------
-- Sequence structure for upms_log_id_seq
-- ----------------------------
DROP SEQUENCE IF EXISTS "synch"."upms_log_id_seq";
CREATE SEQUENCE "synch"."upms_log_id_seq" 
INCREMENT 1
MINVALUE  1
MAXVALUE 9223372036854775807
START 1
CACHE 1;

-- ----------------------------
-- Table structure for synch_channel
-- ----------------------------
DROP TABLE IF EXISTS "synch"."synch_channel";
CREATE TABLE "synch"."synch_channel" (
  "id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
  "label" varchar(255) COLLATE "pg_catalog"."default"
)
;
COMMENT ON COLUMN "synch"."synch_channel"."id" IS '主键';
COMMENT ON COLUMN "synch"."synch_channel"."label" IS '通道名称';

-- ----------------------------
-- Records of synch_channel
-- ----------------------------
INSERT INTO "synch"."synch_channel" VALUES ('kuang-001', '矿1');

-- ----------------------------
-- Table structure for synch_data
-- ----------------------------
DROP TABLE IF EXISTS "synch"."synch_data";
CREATE TABLE "synch"."synch_data" (
  "id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL DEFAULT nextval('"synch".upms_log_id_seq'::regclass),
  "tab_schema" varchar(255) COLLATE "pg_catalog"."default",
  "tab_name" varchar(255) COLLATE "pg_catalog"."default",
  "opt" varchar(32) COLLATE "pg_catalog"."default",
  "channel_id" varchar(64) COLLATE "pg_catalog"."default",
  "data_new" jsonb,
  "data_old" jsonb,
  "create_time" timestamp(6),
  "table_id" int8
)
;
COMMENT ON COLUMN "synch"."synch_data"."id" IS '主键';
COMMENT ON COLUMN "synch"."synch_data"."tab_schema" IS '模式';
COMMENT ON COLUMN "synch"."synch_data"."tab_name" IS '表名';
COMMENT ON COLUMN "synch"."synch_data"."opt" IS '操作类型';
COMMENT ON COLUMN "synch"."synch_data"."channel_id" IS '通道';
COMMENT ON COLUMN "synch"."synch_data"."data_new" IS '新数据';
COMMENT ON COLUMN "synch"."synch_data"."data_old" IS '旧数据';
COMMENT ON COLUMN "synch"."synch_data"."create_time" IS '创建时间';
COMMENT ON COLUMN "synch"."synch_data"."table_id" IS 'synch_table_id';

-- ----------------------------
-- Table structure for synch_host_config
-- ----------------------------
DROP TABLE IF EXISTS "synch"."synch_host_config";
CREATE TABLE "synch"."synch_host_config" (
  "host_name" varchar(255) COLLATE "pg_catalog"."default"
)
;
COMMENT ON COLUMN "synch"."synch_host_config"."host_name" IS '同步端名称';

-- ----------------------------
-- Records of synch_host_config
-- ----------------------------
INSERT INTO "synch"."synch_host_config" VALUES ('kuang-master');

-- ----------------------------
-- Table structure for synch_table
-- ----------------------------
DROP TABLE IF EXISTS "synch"."synch_table";
CREATE TABLE "synch"."synch_table" (
  "id" int8 NOT NULL,
  "tab_schema" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
  "tab_name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
  "init_data" int4 NOT NULL
)
;
COMMENT ON COLUMN "synch"."synch_table"."id" IS '主键';
COMMENT ON COLUMN "synch"."synch_table"."tab_schema" IS '模式';
COMMENT ON COLUMN "synch"."synch_table"."tab_name" IS '表名';
COMMENT ON COLUMN "synch"."synch_table"."init_data" IS '0:自动初始化表数据，-1：不初始化数据';

-- ----------------------------
-- Function structure for synch_data_bind_to_anytable
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."synch_data_bind_to_anytable"();
CREATE OR REPLACE FUNCTION "synch"."synch_data_bind_to_anytable"()
  RETURNS "pg_catalog"."trigger" AS $BODY$

declare
	tab_row_new record;
	tab_row_old record;
	
	data_json_new jsonb := null;
  data_json_old jsonb := null;
	
	tg_table_schema varchar := tg_table_schema;
	tg_table_name varchar := tg_table_name;
	tg_op varchar := tg_op;


	sql_synch_data varchar;
  id_synch_data varchar;
	
begin

	if tg_op = 'INSERT' or tg_op = 'DELETE' or tg_op = 'UPDATE'
	then
	
	
			--raise notice 'TG_NAME--> %',TG_NAME;
			--raise notice 'TG_RELID--> %',TG_RELID;
			--raise notice 'TG_NARGS--> %',TG_NARGS;
			--raise notice 'TG_ARGV--> %',TG_ARGV;
		  
			
			data_json_new = row_to_json(new)::jsonb;
			data_json_old = row_to_json(old)::jsonb;
			
			
			
			
			sql_synch_data = 'select id from synch.synch_data where 1=1 and tab_schema = $1 and tab_name = $2 and opt = $3';
			
			
			if data_json_new is not null then
				sql_synch_data = sql_synch_data || ' and data_new= ' || quote_literal(data_json_new::text);
			else
				sql_synch_data = sql_synch_data || ' and data_new is null ';
			end if;
			
			
			if data_json_old is not null then
				sql_synch_data = sql_synch_data || ' and data_old = ' || quote_literal(data_json_old::text);
			else
				sql_synch_data = sql_synch_data || ' and data_old is null ';
			end if;
			
			

			
			execute 
					sql_synch_data into id_synch_data using tg_table_schema, tg_table_name ,tg_op;

			
			if id_synch_data is not null then
				delete from synch.synch_data where id = id_synch_data;
				return old;
			end if;
			
			 
			
			execute
			'select synch.synch_data_insert($1,$2,$3,$4,$5)' using tg_table_schema,tg_table_name,tg_op,data_json_new,data_json_old;
			
		
	end if;
	

	
	return old;
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Function structure for synch_data_insert
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."synch_data_insert"("tab_schema" varchar, "tab_name" varchar, "opt" varchar, "data_new" jsonb, "data_old" jsonb);
CREATE OR REPLACE FUNCTION "synch"."synch_data_insert"("tab_schema" varchar, "tab_name" varchar, "opt" varchar, "data_new" jsonb, "data_old" jsonb)
  RETURNS "pg_catalog"."int4" AS $BODY$

declare
	channel_row record;
	synch_table_row record;
	id varchar;
	host_config record;

begin
			execute
				'select * from synch.synch_table where tab_schema =' || quote_literal(tab_schema) || ' and tab_name = ' || quote_literal(tab_name)  into synch_table_row;
			
			
			
		  for channel_row in (select * from synch.synch_channel)
			loop
					select nextval('synch.upms_log_id_seq') into id;
					select * from synch.synch_host_config limit 1 into host_config;
					
				  id = host_config.host_name || '-' || id;
					 
					insert into synch.synch_data(id,tab_schema,tab_name,opt,channel_id,data_new,data_old,create_time,table_id)
					VALUES(id,tab_schema,tab_name,opt,channel_row.id,data_new,data_old,now(),synch_table_row.id);
			end loop;
	return 0;
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Function structure for synch_trigger_bind_to_synch_data_table
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."synch_trigger_bind_to_synch_data_table"();
CREATE OR REPLACE FUNCTION "synch"."synch_trigger_bind_to_synch_data_table"()
  RETURNS "pg_catalog"."trigger" AS $BODY$

declare
	host_config record;
	
	insert_sql varchar;
	
	delete_sql varchar;
	delete_item record;
	
	
	update_sql varchar;
	update_item record;
	
	
	count_trigger int;
	
	

begin

		  if tg_op != 'INSERT' then
					return old;
			end if;
			
			
			
			select * from synch.synch_host_config into host_config;
			if new.id  like host_config.host_name || '-%' then
						return new;
			end if;
			


			if new.opt = 'INSERT' then
					insert_sql = 'select * from jsonb_populate_record(null::' || quote_ident(new.tab_schema) || '.' || quote_ident(new.tab_name) || ' , '|| quote_literal(new.data_new) ||')';
					insert_sql = 'insert into '|| quote_ident(new.tab_schema) || '.' || quote_ident(new.tab_name) || ' ' || insert_sql;
					execute
						insert_sql;
					
					
		  elseif new.opt = 'DELETE' then
					delete_sql = ' where 1=1 ';
					for delete_item in (select * from jsonb_each_text(new.data_old))
					loop
						--raise notice 'key-->%   value-->% ', delete_item.key,delete_item.value;
						
						if delete_item.value is not null then
							delete_sql = delete_sql || ' and ' || delete_item.key || ' = ' || quote_literal(delete_item.value);
							
						else 
							delete_sql = delete_sql || ' and ' || delete_item.key || ' is null ';
						end if;
						
						
					end loop;
					
					delete_sql = 'delete  from ' || quote_ident(new.tab_schema) || '.' || quote_ident(new.tab_name) || ' ' || delete_sql;
					raise notice 'delete_sql -- >  %',delete_sql;
					execute
						delete_sql;
						
						
			elseif new.opt = 'UPDATE' then
					update_sql = ' update ' || quote_ident(new.tab_schema) || '.' || quote_ident(new.tab_name) || ' set ';
					
					raise notice 'update_sql-> %',update_sql;
					
					for update_item in (select * from jsonb_each_text(new.data_new))
					loop
						--raise notice 'key-->%   value-->%  ', update_item.key,update_item.value;
							
							if update_item.value is not null then
									update_sql = update_sql ||  update_item.key || ' = ' || quote_literal(update_item.value) || ',';
							else
									update_sql = update_sql ||  update_item.key || ' = NULL' || ',';
							end if;	
					end loop;
					
					raise notice 'update_sql-> %',update_sql;
					
					select left(update_sql ,char_length(update_sql)-1) into update_sql;
					update_sql = update_sql || ' where 1=1 ';
					
					for update_item in (select * from jsonb_each_text(new.data_old))
					loop
						--raise notice 'key-->%   value-->%  ', update_item.key,update_item.value;
						
						if update_item.value is not null then
								update_sql = update_sql || ' and ' ||  update_item.key || ' = ' || quote_literal(update_item.value);
						else
								update_sql = update_sql || ' and ' ||  update_item.key || ' is null ';
						end if;
						
						
					end loop;
					
					
					
					
					raise notice 'update_sql-> %',update_sql;
					
					execute
						update_sql;
			end if;
			
			
			
			select count(1) from synch.synch_table
			where 1=1
			and tab_schema = new.tab_schema
			and tab_name = new.tab_name
			into count_trigger;
			
			if count_trigger = 0 then
					execute 
							'delete from synch.synch_data where 1=1 and id = $1.id' using new;
			end if;
			
			-- 删除该更新记录
			--execute 
					--'delete from synch.synch_data where 1=1 and id = $1.id' using new;
			
			
			
	return new;
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Function structure for synch_trigger_bind_to_synch_table
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."synch_trigger_bind_to_synch_table"();
CREATE OR REPLACE FUNCTION "synch"."synch_trigger_bind_to_synch_table"()
  RETURNS "pg_catalog"."trigger" AS $BODY$

declare
	tab_row_new record;
	tab_row_old record;
	
	tab_schema_new varchar := null;
	tab_name_new  varchar  := null;
	
	tab_schema_old varchar := null;
	tab_name_old   varchar := null;

begin

			if tg_op = 'DELETE' then
					delete from synch.synch_data where table_id = old.id;
			end if;

	
			

			if tg_op = 'INSERT' then
					execute	
							'select synch.synch_trigger_drop_trigger($1,$2)' using new.tab_schema,new.tab_name;
							
					execute	
							'select synch.synch_trigger_create_trigger($1,$2)' using new.tab_schema,new.tab_name;
							
			elseif tg_op = 'DELETE' then
					execute	
							'select synch.synch_trigger_drop_trigger($1,$2)' using old.tab_schema,old.tab_name;
							
			elseif tg_op = 'UPDATE' then
					if tab_schema_old != null and tab_name_old != null then
						execute	
							'select synch.synch_trigger_drop_trigger($1,$2)' using old.tab_schema,old.tab_name;
					end if;
			
					if tab_schema_new != null and tab_name_new != null then
						execute	
							'select synch.synch_trigger_create_trigger($1,$2)' using new.tab_schema,new.tab_name;
					end if;
					
			end if;
			
			
			
			
	return old;
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Function structure for synch_trigger_create_trigger
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."synch_trigger_create_trigger"("tab_schema" varchar, "tab_name" varchar);
CREATE OR REPLACE FUNCTION "synch"."synch_trigger_create_trigger"("tab_schema" varchar, "tab_name" varchar)
  RETURNS "pg_catalog"."int4" AS $BODY$



begin
		  execute
							'CREATE TRIGGER ' || quote_ident(tab_name) ||'_trigger_delete BEFORE DELETE ON ' || quote_ident(tab_schema) || '.' ||quote_ident(tab_name) ||'
								FOR EACH ROW
								EXECUTE PROCEDURE "synch"."synch_data_bind_to_anytable"()';

			execute
							'CREATE TRIGGER ' || quote_ident(tab_name) ||'_trigger_update AFTER INSERT OR UPDATE ON ' || quote_ident(tab_schema) || '.' ||quote_ident(tab_name) ||'
								FOR EACH ROW
								EXECUTE PROCEDURE "synch"."synch_data_bind_to_anytable"()';


	return 0;
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Function structure for synch_trigger_drop_trigger
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."synch_trigger_drop_trigger"("tab_schema" varchar, "tab_name" varchar);
CREATE OR REPLACE FUNCTION "synch"."synch_trigger_drop_trigger"("tab_schema" varchar, "tab_name" varchar)
  RETURNS "pg_catalog"."int4" AS $BODY$



begin
			RAISE notice 'tab_schema---> %', tab_schema ;
			RAISE notice 'tab_name---> %', tab_name ;

		  execute
					'DROP TRIGGER IF EXISTS '  ||quote_ident(tab_name) || '_trigger_delete  ON ' || quote_ident(tab_schema) || '.' ||quote_ident(tab_name);
					
			execute
					'DROP TRIGGER IF EXISTS '  ||quote_ident(tab_name) || '_trigger_update  ON ' || quote_ident(tab_schema) || '.' ||quote_ident(tab_name);
	return 0;

	
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Function structure for synch_trigger_foreign_key_bind_to_synch_channel_table
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."synch_trigger_foreign_key_bind_to_synch_channel_table"();
CREATE OR REPLACE FUNCTION "synch"."synch_trigger_foreign_key_bind_to_synch_channel_table"()
  RETURNS "pg_catalog"."trigger" AS $BODY$

declare
	
	
	
	

begin

			
	if tg_op != 'DELETE' then
		return old;
	end if;
		 
			delete from synch.synch_data where channel_id = old.id;
			
			
	return old;
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Function structure for synch_trigger_init_table_bind_to_synch_table
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."synch_trigger_init_table_bind_to_synch_table"();
CREATE OR REPLACE FUNCTION "synch"."synch_trigger_init_table_bind_to_synch_table"()
  RETURNS "pg_catalog"."trigger" AS $BODY$

declare
	data_row record;
	data_json_new jsonb := null;

begin

			if new.init_data is null then
					return old;
			end if;
			
			if new.init_data != 0 then
					return old;
			end if;
			
			if tg_op != 'INSERT' then
					return old;
			end if;




		 
		
			for data_row in execute 'select * from '|| quote_ident(new.tab_schema) ||'.'|| quote_ident(new.tab_name)
			loop
					data_json_new = row_to_json(data_row)::jsonb;
					execute
					'select synch.synch_data_insert($1,$2,$3,$4,$5)' using new.tab_schema,new.tab_name,'INSERT',data_json_new,null::jsonb;
			end loop;
			
	
			
	return old;
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Function structure for test_insert_web
-- ----------------------------
DROP FUNCTION IF EXISTS "synch"."test_insert_web"();
CREATE OR REPLACE FUNCTION "synch"."test_insert_web"()
  RETURNS "pg_catalog"."int4" AS $BODY$

declare
	

begin
		  for i in 1..100
			loop

				
				INSERT INTO "public"."blade_user" VALUES (i, '000000', NULL, 1, i||'admin', '90b9aa7e25f80cf4f64e990b78a9fc5ebd6cecad', '管理员', '管理员', 'https://gw.alipayobjects.com/zos/rmsportal/BiazfanxmamNRoxxVxka.png', 'admin@bladex.vip', '123333333333', '2018-08-08 00:00:00', 1, '1123598816738675201', '1123598813738675201', '1123598817738675201', 1123598821738675201, 1123598813738675201, '2018-08-08 00:00:00', 1123598821738675201, '2018-08-08 00:00:00', 1, 0);
			end loop;
	return 0;
end;

$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

-- ----------------------------
-- Alter sequences owned by
-- ----------------------------
SELECT setval('"synch"."upms_log_id_seq"', 134531, true);

-- ----------------------------
-- Triggers structure for table synch_channel
-- ----------------------------
CREATE TRIGGER "auto_foreign_key" AFTER DELETE ON "synch"."synch_channel"
FOR EACH ROW
EXECUTE PROCEDURE "synch"."synch_trigger_foreign_key_bind_to_synch_channel_table"();

-- ----------------------------
-- Primary Key structure for table synch_channel
-- ----------------------------
ALTER TABLE "synch"."synch_channel" ADD CONSTRAINT "data_config_pkey" PRIMARY KEY ("id");

-- ----------------------------
-- Triggers structure for table synch_data
-- ----------------------------
CREATE TRIGGER "auto_insert_data_update" AFTER INSERT ON "synch"."synch_data"
FOR EACH ROW
EXECUTE PROCEDURE "synch"."synch_trigger_bind_to_synch_data_table"();

-- ----------------------------
-- Primary Key structure for table synch_data
-- ----------------------------
ALTER TABLE "synch"."synch_data" ADD CONSTRAINT "data_synch_pkey" PRIMARY KEY ("id");

-- ----------------------------
-- Triggers structure for table synch_table
-- ----------------------------
CREATE TRIGGER "auto_bind_trigger_delete" BEFORE DELETE ON "synch"."synch_table"
FOR EACH ROW
EXECUTE PROCEDURE "synch"."synch_trigger_bind_to_synch_table"();
CREATE TRIGGER "auto_bind_trigger_update" AFTER INSERT OR UPDATE ON "synch"."synch_table"
FOR EACH ROW
EXECUTE PROCEDURE "synch"."synch_trigger_bind_to_synch_table"();
CREATE TRIGGER "auto_init_table_rows_insert" AFTER INSERT ON "synch"."synch_table"
FOR EACH ROW
EXECUTE PROCEDURE "synch"."synch_trigger_init_table_bind_to_synch_table"();

-- ----------------------------
-- Primary Key structure for table synch_table
-- ----------------------------
ALTER TABLE "synch"."synch_table" ADD CONSTRAINT "synch_table_pkey" PRIMARY KEY ("id");
